- Import¶
In [ ]:
%matplotlib inline
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torch import autograd
from torch.autograd import Variable
from torchvision.utils import make_grid
import matplotlib.pyplot as plt
import os
import shutil
import torchvision.utils as vutils
import matplotlib.animation as animation
from IPython.display import HTML
In [ ]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('torch version:',torch.__version__)
print('device:', device)
torch version: 2.2.1+cu121 device: cuda
- Parameters¶
In [ ]:
os.listdir('./data/archive/chest_xray')
# Function to create a csv containing image path and labels from folders of images
def create_csv(data_dir, csv_name):
data = []
for folder in os.listdir(data_dir):
if folder in ['NORMAL', 'PNEUMONIA']:
for file in os.listdir(data_dir + folder):
data.append([data_dir + folder + '/' + file, folder])
df = pd.DataFrame(data, columns=['image', 'label'])
df.to_csv(csv_name, index=False)
print('CSV created')
# Print and format number of instances in each class and total instances
print(df['label'].value_counts())
# Draw plot for percentage of instances in each class and add labels
plt.figure(figsize=(5, 5))
df['label'].value_counts().plot(kind='pie', autopct='%1.1f%%').set_title(csv_name)
# Save csv to suitable location
df.to_csv(csv_name, index=False)
return df
In [ ]:
train_csv = create_csv('./data/archive/chest_xray/train/', 'train.csv')
val_csv = create_csv('./data/archive/chest_xray/val/', 'val.csv')
test_csv = create_csv('./data/archive/chest_xray/test/', 'test.csv')
CSV created label PNEUMONIA 3875 NORMAL 1341 Name: count, dtype: int64 CSV created label NORMAL 8 PNEUMONIA 8 Name: count, dtype: int64 CSV created label PNEUMONIA 390 NORMAL 234 Name: count, dtype: int64
In [ ]:
# Data
train_data_path = './train.csv' # Path of data
valid_data_path = './test.csv' # Path of data
print('Train data path:', train_data_path)
print('Valid data path:', valid_data_path)
img_size = 64 # Image size
batch_size = 64 # Batch size
# Model
z_size = 100
generator_layer_size = [256, 512, 1024]
discriminator_layer_size = [1024, 512, 256]
# Training
epochs = 30 # Train epochs
learning_rate = 1e-4
Train data path: ./train.csv Valid data path: ./test.csv
- Pytorch Dataset, DataLoader: Chest Xray¶
In [ ]:
class_list = ['NORMAL', 'PNEUMONIA']
class_num = len(class_list)
In [ ]:
# Function to find normalizing parameters for data
def find_mean_std(data_path):
df = pd.read_csv(data_path)
mean = 0.
std = 0.
nb_samples = 0.
for i in range(len(df)):
img = Image.open(df['image'][i])
img = np.array(img)/255
mean += np.mean(img)
std += np.std(img)
nb_samples += 1.
mean /= nb_samples
std /= nb_samples
return mean, std
In [ ]:
# Pytorch dataset that loads images and labels from csv
class ChestXrayDataset(Dataset):
def __init__(self, csv_file, transform=None):
self.data = pd.read_csv(csv_file)
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
img_name = self.data.iloc[idx, 0]
image = Image.open(img_name).convert('RGB')
label = 1 if self.data.iloc[idx, 1] == 'PNEUMONIA' else 0
if self.transform:
image = self.transform(image)
return image, label
# Pytorch dataloader that loads data from dataset and resizes image to 256x256
transform = transforms.Compose([
transforms.Resize((img_size, img_size)),
transforms.ToTensor(),
transforms.Normalize(mean=(0.354,), std=(0.4435,))
])
train_data = ChestXrayDataset(train_data_path, transform)
valid_data = ChestXrayDataset(valid_data_path, transform)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True,drop_last=True)
In [ ]:
# Check if dataloader is working
for i, (images, labels) in enumerate(train_loader):
print(images.shape, labels)
# Show images
plt.figure(figsize=(16, 24))
plt.axis("off")
plt.title("Training Images")
# Make grid from images and plot labels
plt.imshow(np.transpose(make_grid(images, nrow=8).cpu(), (1, 2, 0)))
break
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
torch.Size([64, 3, 64, 64]) tensor([1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0, 1,
1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1,
1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1])
Implementation using Deep Convolutional GAN (DCGAN)¶
In [ ]:
# Number of workers for dataloader
workers = 2
# Batch size during training
batch_size = 64
# Spatial size of training images. All images will be resized to this
# size using a transformer.
image_size = 64
# Number of channels in the training images. For color images this is 3
nc = 3
# Size of z latent vector (i.e. size of generator input)
nz = 100
# Size of feature maps in generator
ngf = 64
# Size of feature maps in discriminator
ndf = 64
# Number of training epochs
num_epochs = 20
# Learning rate for optimizers
lr = 0.0002
# Beta1 hyperparameter for Adam optimizers
beta1 = 0.5
# Number of GPUs available. Use 0 for CPU mode.
ngpu = 1
- Generator¶
In [ ]:
# custom weights initialization called on ``netG`` and ``netD``
def weights_init(m):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
nn.init.normal_(m.weight.data, 0.0, 0.02)
elif classname.find('BatchNorm') != -1:
nn.init.normal_(m.weight.data, 1.0, 0.02)
nn.init.constant_(m.bias.data, 0)
class Generator(nn.Module):
def __init__(self, ngpu):
super(Generator, self).__init__()
self.ngpu = ngpu
self.main = nn.Sequential(
# input is Z, going into a convolution
nn.ConvTranspose2d( nz, ngf * 8, 4, 1, 0, bias=False),
nn.BatchNorm2d(ngf * 8),
nn.ReLU(True),
# state size. ``(ngf*8) x 4 x 4``
nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(ngf * 4),
nn.ReLU(True),
# state size. ``(ngf*4) x 8 x 8``
nn.ConvTranspose2d( ngf * 4, ngf * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(ngf * 2),
nn.ReLU(True),
# state size. ``(ngf*2) x 16 x 16``
nn.ConvTranspose2d( ngf * 2, ngf, 4, 2, 1, bias=False),
nn.BatchNorm2d(ngf),
nn.ReLU(True),
# state size. ``(ngf) x 32 x 32``
nn.ConvTranspose2d( ngf, nc, 4, 2, 1, bias=False),
nn.Tanh()
# state size. ``(nc) x 64 x 64``
)
def forward(self, input):
return self.main(input)
In [ ]:
# Create the generator
netG = Generator(ngpu).to(device)
# # Handle multi-GPU if desired
# if (device.type == 'cuda') and (ngpu > 1):
# netG = nn.DataParallel(netG, list(range(ngpu)))
# Apply the ``weights_init`` function to randomly initialize all weights
# to ``mean=0``, ``stdev=0.02``.
netG.apply(weights_init)
# Print the model
print(netG)
Generator(
(main): Sequential(
(0): ConvTranspose2d(100, 512, kernel_size=(4, 4), stride=(1, 1), bias=False)
(1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(2): ReLU(inplace=True)
(3): ConvTranspose2d(512, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
(4): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(5): ReLU(inplace=True)
(6): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
(7): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(8): ReLU(inplace=True)
(9): ConvTranspose2d(128, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
(10): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(11): ReLU(inplace=True)
(12): ConvTranspose2d(64, 3, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
(13): Tanh()
)
)
- Discriminator¶
In [ ]:
class Discriminator(nn.Module):
def __init__(self, ngpu):
super(Discriminator, self).__init__()
self.ngpu = ngpu
self.main = nn.Sequential(
# input is ``(nc) x 64 x 64``
nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
nn.LeakyReLU(0.2, inplace=True),
# state size. ``(ndf) x 32 x 32``
nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 2),
nn.LeakyReLU(0.2, inplace=True),
# state size. ``(ndf*2) x 16 x 16``
nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 4),
nn.LeakyReLU(0.2, inplace=True),
# state size. ``(ndf*4) x 8 x 8``
nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 8),
nn.LeakyReLU(0.2, inplace=True),
# state size. ``(ndf*8) x 4 x 4``
nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
nn.Sigmoid()
)
def forward(self, input):
return self.main(input)
In [ ]:
# Create the Discriminator
netD = Discriminator(ngpu).to(device)
# Handle multi-GPU if desired
# if (device.type == 'cuda') and (ngpu > 1):
# netD = nn.DataParallel(netD, list(range(ngpu)))
# Apply the ``weights_init`` function to randomly initialize all weights
# like this: ``to mean=0, stdev=0.2``.
netD.apply(weights_init)
# Print the model
print(netD)
Discriminator(
(main): Sequential(
(0): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
(1): LeakyReLU(negative_slope=0.2, inplace=True)
(2): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
(3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(4): LeakyReLU(negative_slope=0.2, inplace=True)
(5): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
(6): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(7): LeakyReLU(negative_slope=0.2, inplace=True)
(8): Conv2d(256, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
(9): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
(10): LeakyReLU(negative_slope=0.2, inplace=True)
(11): Conv2d(512, 1, kernel_size=(4, 4), stride=(1, 1), bias=False)
(12): Sigmoid()
)
)
- Adversarial Learning of Generator & Discriminator¶
In [ ]:
# Initialize the ``BCELoss`` function
criterion = nn.BCELoss()
# Create batch of latent vectors that we will use to visualize
# the progression of the generator
fixed_noise = torch.randn(64, nz, 1, 1, device=device)
# Establish convention for real and fake labels during training
real_label = 1.
fake_label = 0.
# Setup Adam optimizers for both G and D
optimizerD = torch.optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = torch.optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))
In [ ]:
# Training Loop
# Lists to keep track of progress
img_list = []
G_losses = []
D_losses = []
iters = 0
print("Starting Training Loop...")
# For each epoch
for epoch in range(num_epochs):
# For each batch in the dataloader
for i, data in enumerate(train_loader, 0):
############################
# (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
###########################
## Train with all-real batch
netD.zero_grad()
# Format batch
real_cpu = data[0].to(device)
b_size = real_cpu.size(0)
label = torch.full((b_size,), real_label, dtype=torch.float, device=device)
# Forward pass real batch through D
output = netD(real_cpu).view(-1)
# Calculate loss on all-real batch
errD_real = criterion(output, label)
# Calculate gradients for D in backward pass
errD_real.backward()
D_x = output.mean().item()
## Train with all-fake batch
# Generate batch of latent vectors
noise = torch.randn(b_size, nz, 1, 1, device=device)
# Generate fake image batch with G
fake = netG(noise)
label.fill_(fake_label)
# Classify all fake batch with D
output = netD(fake.detach()).view(-1)
# Calculate D's loss on the all-fake batch
errD_fake = criterion(output, label)
# Calculate the gradients for this batch, accumulated (summed) with previous gradients
errD_fake.backward()
D_G_z1 = output.mean().item()
# Compute error of D as sum over the fake and the real batches
errD = errD_real + errD_fake
# Update D
optimizerD.step()
############################
# (2) Update G network: maximize log(D(G(z)))
###########################
netG.zero_grad()
label.fill_(real_label) # fake labels are real for generator cost
# Since we just updated D, perform another forward pass of all-fake batch through D
output = netD(fake).view(-1)
# Calculate G's loss based on this output
errG = criterion(output, label)
# Calculate gradients for G
errG.backward()
D_G_z2 = output.mean().item()
# Update G
optimizerG.step()
# Output training stats
if i % 50 == 0:
print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
% (epoch, num_epochs, i, len(train_loader),
errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
# Save Losses for plotting later
G_losses.append(errG.item())
D_losses.append(errD.item())
# Check how the generator is doing by saving G's output on fixed_noise
if (iters % 500 == 0) or ((epoch == num_epochs-1) and (i == len(train_loader)-1)):
with torch.no_grad():
fake = netG(fixed_noise).detach().cpu()
img_list.append(vutils.make_grid(fake, padding=2, normalize=True))
iters += 1
Starting Training Loop... [0/20][0/82] Loss_D: 1.7407 Loss_G: 2.3106 D(x): 0.2827 D(G(z)): 0.2423 / 0.1295 [0/20][50/82] Loss_D: 0.0124 Loss_G: 16.2756 D(x): 0.9894 D(G(z)): 0.0000 / 0.0000 [1/20][0/82] Loss_D: 4.7998 Loss_G: 24.3907 D(x): 0.9899 D(G(z)): 0.9777 / 0.0000 [1/20][50/82] Loss_D: 0.1183 Loss_G: 3.6803 D(x): 0.9385 D(G(z)): 0.0312 / 0.0342 [2/20][0/82] Loss_D: 0.2286 Loss_G: 3.9317 D(x): 0.9118 D(G(z)): 0.0998 / 0.0230 [2/20][50/82] Loss_D: 0.4289 Loss_G: 6.1952 D(x): 0.8957 D(G(z)): 0.2427 / 0.0042 [3/20][0/82] Loss_D: 1.4852 Loss_G: 2.9272 D(x): 0.3349 D(G(z)): 0.0064 / 0.0953 [3/20][50/82] Loss_D: 0.8346 Loss_G: 4.1261 D(x): 0.7215 D(G(z)): 0.2580 / 0.0251 [4/20][0/82] Loss_D: 0.6448 Loss_G: 2.7380 D(x): 0.7312 D(G(z)): 0.1719 / 0.0856 [4/20][50/82] Loss_D: 0.6988 Loss_G: 5.2372 D(x): 0.6154 D(G(z)): 0.0138 / 0.0138 [5/20][0/82] Loss_D: 0.5077 Loss_G: 5.3517 D(x): 0.9037 D(G(z)): 0.2959 / 0.0103 [5/20][50/82] Loss_D: 0.3918 Loss_G: 4.2161 D(x): 0.9563 D(G(z)): 0.2669 / 0.0228 [6/20][0/82] Loss_D: 0.3618 Loss_G: 4.4418 D(x): 0.8715 D(G(z)): 0.1738 / 0.0209 [6/20][50/82] Loss_D: 0.4324 Loss_G: 4.2630 D(x): 0.9370 D(G(z)): 0.2632 / 0.0308 [7/20][0/82] Loss_D: 0.5491 Loss_G: 4.9970 D(x): 0.8226 D(G(z)): 0.2418 / 0.0195 [7/20][50/82] Loss_D: 0.3357 Loss_G: 4.1042 D(x): 0.9055 D(G(z)): 0.1946 / 0.0263 [8/20][0/82] Loss_D: 0.6935 Loss_G: 2.6154 D(x): 0.6329 D(G(z)): 0.0398 / 0.1278 [8/20][50/82] Loss_D: 0.4503 Loss_G: 5.9750 D(x): 0.9674 D(G(z)): 0.3067 / 0.0106 [9/20][0/82] Loss_D: 0.7605 Loss_G: 7.4600 D(x): 0.9330 D(G(z)): 0.4423 / 0.0037 [9/20][50/82] Loss_D: 0.5188 Loss_G: 3.6532 D(x): 0.6751 D(G(z)): 0.0260 / 0.0569 [10/20][0/82] Loss_D: 0.5745 Loss_G: 7.1958 D(x): 0.9068 D(G(z)): 0.3416 / 0.0028 [10/20][50/82] Loss_D: 0.4096 Loss_G: 3.3341 D(x): 0.8156 D(G(z)): 0.1316 / 0.0893 [11/20][0/82] Loss_D: 0.7752 Loss_G: 2.6726 D(x): 0.5755 D(G(z)): 0.0471 / 0.1064 [11/20][50/82] Loss_D: 0.2457 Loss_G: 4.8308 D(x): 0.8451 D(G(z)): 0.0525 / 0.0127 [12/20][0/82] Loss_D: 0.3273 Loss_G: 3.6274 D(x): 0.8262 D(G(z)): 0.0987 / 0.0385 [12/20][50/82] Loss_D: 0.5837 Loss_G: 5.7864 D(x): 0.9705 D(G(z)): 0.3896 / 0.0086 [13/20][0/82] Loss_D: 0.4423 Loss_G: 3.0666 D(x): 0.7822 D(G(z)): 0.1315 / 0.0785 [13/20][50/82] Loss_D: 0.2739 Loss_G: 3.0641 D(x): 0.9315 D(G(z)): 0.1696 / 0.0680 [14/20][0/82] Loss_D: 0.7148 Loss_G: 6.2585 D(x): 0.9269 D(G(z)): 0.4287 / 0.0045 [14/20][50/82] Loss_D: 0.4180 Loss_G: 4.8717 D(x): 0.9475 D(G(z)): 0.2835 / 0.0170 [15/20][0/82] Loss_D: 0.2832 Loss_G: 3.9347 D(x): 0.8723 D(G(z)): 0.1018 / 0.0393 [15/20][50/82] Loss_D: 0.3909 Loss_G: 3.9928 D(x): 0.8750 D(G(z)): 0.2004 / 0.0261 [16/20][0/82] Loss_D: 0.5567 Loss_G: 6.0385 D(x): 0.9308 D(G(z)): 0.3533 / 0.0049 [16/20][50/82] Loss_D: 0.4988 Loss_G: 2.5990 D(x): 0.6964 D(G(z)): 0.0415 / 0.0999 [17/20][0/82] Loss_D: 0.9614 Loss_G: 4.0359 D(x): 0.5219 D(G(z)): 0.0234 / 0.0839 [17/20][50/82] Loss_D: 0.4242 Loss_G: 3.2042 D(x): 0.7702 D(G(z)): 0.0610 / 0.0543 [18/20][0/82] Loss_D: 1.1599 Loss_G: 7.4466 D(x): 0.9450 D(G(z)): 0.6023 / 0.0028 [18/20][50/82] Loss_D: 0.2849 Loss_G: 4.6865 D(x): 0.9165 D(G(z)): 0.1560 / 0.0126 [19/20][0/82] Loss_D: 0.4205 Loss_G: 3.4973 D(x): 0.7462 D(G(z)): 0.0666 / 0.0660 [19/20][50/82] Loss_D: 0.3038 Loss_G: 3.8652 D(x): 0.8375 D(G(z)): 0.0802 / 0.0320
In [ ]:
# Save model weights
torch.save(netG.state_dict(), 'generator.pth')
torch.save(netD.state_dict(), 'discriminator.pth')
Plot Loss and images¶
In [ ]:
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()
In [ ]:
fig = plt.figure(figsize=(14,14))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)
HTML(ani.to_jshtml())
Out[ ]: